#!/bin/bash

# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


# Entrypoint script for yarn container.

docker_name="${FRAMEWORK_NAME//\~/-}-$CONTAINER_ID"
export PAI_DEFAULT_FS_URI={{ hdfsUri }}

exec 13>$LAUNCHER_LOG_DIR/runtime.yarn.pai.log
BASH_XTRACEFD=13

RUNTIME_YARN_ERR_LOG=$LAUNCHER_LOG_DIR/runtime.yarn.pai.error
RUNTIME_YARN_LOG=$LAUNCHER_LOG_DIR/runtime.yarn.pai.log
RUNTIME_DOCKER_ERR_LOG=$LAUNCHER_LOG_DIR/runtime.docker.pai.error
RUNTIME_DOCKER_LOG=$LAUNCHER_LOG_DIR/runtime.docker.pai.log
USER_STDOUT=$LAUNCHER_LOG_DIR/user.pai.stdout
USER_STDERR=$LAUNCHER_LOG_DIR/user.pai.stderr
USER_LOG_ALL=$LAUNCHER_LOG_DIR/user.pai.all
DISK_CLEANER_ERROR_FILE=$LAUNCHER_LOG_DIR/diskCleaner.pai.error
RUNTIME_AGG_ERROR_FILE=$LAUNCHER_LOG_DIR/runtime.pai.agg.error

touch $RUNTIME_YARN_ERR_LOG
touch $RUNTIME_YARN_LOG

MAX_SOLUTION_SIZE=384
MAX_REASON_SIZE=384

MAX_USER_LOG_SIZE=2048
MAX_RUNTIME_YARN_LOG_SIZE=896
MAX_RUNTIME_DOCKER_LOG_SIZE=896
MAX_DISK_CLEANER_LOG_SIZE=768

EXIT_SUCCEED=0
CONTAINER_SIGTERM_ERROR=143
DOCKER_DEAMON_ERROR=193
CONTAINER_OOM_ERROR=196
CONTAINER_OOD_ERROR=198
RUNTIME_UNRECOGNIZED_ERROR=255

SIG_2="SIGINT"
SIG_4="SIGILL"
SIG_6="SIGABRT"
SIG_7="SIGBUS"
SIG_8="SIGFPE"
SIG_9="SIGKILL"
SIG_11="SIGSEGV"
SIG_13="SIGPIPE"
SIG_15="SIGTERM"


function log_runtime_error()
{
  local level=$1
  local message_type=$2
  local error_message=$3
  local timestamp=$(date +%s)

  printf "%s\n" "$timestamp $level $message_type $error_message" >> $RUNTIME_YARN_ERR_LOG
}


function get_user_exit_code()
{
  local user_exit_code=$(grep -o "INFO USER_EXIT_CODE.*" $RUNTIME_YARN_ERR_LOG | tail -n 1 | grep -o "[0-9]*")
  echo "$user_exit_code"
}


function write_error_summary()
{
  local exit_code=$1
  local trigger=$2
  local parse_file=$3
  local docker_status=$4
  local caught_signal=$5

  local user_exit_code=$(get_user_exit_code)
  if [[ $exit_code == "" ]]; then
    if [[ $user_exit_code != "" && $user_exit_code -gt 0 ]]; then
      exit_code=$RUNTIME_UNRECOGNIZED_ERROR
    else
      exit_code=$EXIT_SUCCEED
    fi
  fi

  echo "exitcode: ${exit_code}" >> $RUNTIME_AGG_ERROR_FILE
  echo "trigger: $trigger" >> $RUNTIME_AGG_ERROR_FILE

  if [[ $parse_file != "" ]]; then
    local reason=$(grep "ERROR REASON" $parse_file |  tail -n 1 | grep -o '".*"' | sed 's/"//g' | head -c "$MAX_REASON_SIZE")
    local solution=$(grep "ERROR SOLUTION" $parse_file |  tail -n 1 | grep -o '".*"' | sed 's/"//g' | head -c "$MAX_SOLUTION_SIZE")
  fi

  echo "reason: |" >> $RUNTIME_AGG_ERROR_FILE
  echo "  $reason" >> $RUNTIME_AGG_ERROR_FILE

  echo "solution: |" >> $RUNTIME_AGG_ERROR_FILE
  echo "  $solution" >> $RUNTIME_AGG_ERROR_FILE

  echo "originalUserExitCode: $user_exit_code" >> $RUNTIME_AGG_ERROR_FILE

  echo "dockerStatus: $docker_status" >> $RUNTIME_AGG_ERROR_FILE
  echo "caughtSignal: $caught_signal" >> $RUNTIME_AGG_ERROR_FILE

  # Not supported yet
  echo "caughtException:" >> $RUNTIME_AGG_ERROR_FILE
  echo "matchedUserLogString:" >> $RUNTIME_AGG_ERROR_FILE
}


function printf_file_tail_with_shift()
{
  local file_name=$1
  local shift_space=$2
  local tail_char_num=$3

  if [[ $tail_char_num == "" ]]; then
    tail_num=256
  fi

  local file_content=$(tail -c "$tail_char_num" "$file_name")
  while IFS= read -r line
  do
    line=$(echo $line | sed -e 's/^[[:blank:]]*//' | tr -d '\n\r' | tr -cd '[:print:]')
    echo "$shift_space$line"
  done < <(printf '%s\n' "$file_content")
}


function get_received_signal()
{
  local exit_code=$1
  local signal=$(($exit_code-128))
  local signal_name=SIG_$signal
  echo ${!signal_name}
}


function generate_error_summary()
{
  local user_exit_code=$(get_user_exit_code)
  local signal=$(get_received_signal $user_exit_code)

  if [[ -f $DISK_CLEANER_ERROR_FILE ]]; then
    grep "ACTION \"KILL\"" $DISK_CLEANER_ERROR_FILE 2>&1 > /dev/null
    if [[ $? -eq 0 ]]; then
      write_error_summary $CONTAINER_OOD_ERROR "diskCleanerErrorLog" $DISK_CLEANER_ERROR_FILE
      return
    fi
  fi

  if [[ -f $RUNTIME_DOCKER_ERR_LOG ]]; then
    grep "ERROR REASON" $RUNTIME_DOCKER_ERR_LOG | grep "User process exited with code" 2>&1 > /dev/null
    if [[ $? -eq 0 && -z $signal ]]; then
      write_error_summary $RUNTIME_UNRECOGNIZED_ERROR "originalUserExitCode" $RUNTIME_DOCKER_ERR_LOG
      return
    fi
  fi

  # We assume 125 is a docker error, and retry it without burning user's retry count
  if [[ $user_exit_code -eq 125 ]]; then
    printf "[DEBUG] Get exit code $user_exit_code, we will treat it as $DOCKER_DEAMON_ERROR\n"
    write_error_summary $DOCKER_DEAMON_ERROR "dockerStatus" "" "DeamonError"
    return
  fi

  if [[ -f $RUNTIME_YARN_ERR_LOG ]]; then
    grep "ERROR REASON" $RUNTIME_YARN_ERR_LOG | grep "docker deamon error" 2>&1 > /dev/null
    if [[ $? -eq 0 ]]; then
      write_error_summary $DOCKER_DEAMON_ERROR "dockerStatus" $RUNTIME_YARN_ERR_LOG "DeamonError"
      return
    fi

    grep "ERROR REASON" $RUNTIME_YARN_ERR_LOG | grep "out of memory (OOM)" 2>&1 > /dev/null
    if [[ $? -eq 0 ]]; then
      write_error_summary $CONTAINER_OOM_ERROR "caughtSignal" $RUNTIME_YARN_ERR_LOG "" $signal
      return
    fi
  fi

  if [[ -n $signal ]]; then
    write_error_summary $user_exit_code "caughtSignal" "" "" $signal
    return
  fi

  # write empty error summary for successful exit
  write_error_summary
}


function generate_agg_err()
{
  # Here we persume the kill event trigger seq is diskCleaner, runtime and user error
  printf "[DEBUG] Generate aggregate dynamic error info\n"

  printf "[PAI_RUNTIME_ERROR_START]\n" >> $RUNTIME_AGG_ERROR_FILE
  generate_error_summary

  echo "errorLogs:" >> $RUNTIME_AGG_ERROR_FILE
  echo "  user: |" >> $RUNTIME_AGG_ERROR_FILE
  if [[ -f $USER_STDERR ]]; then
    printf "%s\n" \
      "$(printf_file_tail_with_shift $USER_STDERR "    " $MAX_USER_LOG_SIZE)" >> $RUNTIME_AGG_ERROR_FILE
  fi

  echo "  runtime: |" >> $RUNTIME_AGG_ERROR_FILE
  if [[ -f $RUNTIME_DOCKER_ERR_LOG ]]; then
    printf "%s\n" \
      "$(printf_file_tail_with_shift $RUNTIME_DOCKER_ERR_LOG "    " $MAX_RUNTIME_DOCKER_LOG_SIZE)" >> $RUNTIME_AGG_ERROR_FILE
  fi

  if [[ -f $RUNTIME_YARN_ERR_LOG ]]; then
    printf "%s\n" \
      "$(printf_file_tail_with_shift $RUNTIME_YARN_ERR_LOG "    " $MAX_RUNTIME_YARN_LOG_SIZE)" >> $RUNTIME_AGG_ERROR_FILE
  fi

  echo "  diskCleaner: |" >> $RUNTIME_AGG_ERROR_FILE
  if [[ -f $DISK_CLEANER_ERROR_FILE ]]; then
    printf "%s\n" \
      "$(printf_file_tail_with_shift $DISK_CLEANER_ERROR_FILE "    " $MAX_DISK_CLEANER_LOG_SIZE)" >> $RUNTIME_AGG_ERROR_FILE
  fi
  printf "[PAI_RUNTIME_ERROR_END]\n" >> $RUNTIME_AGG_ERROR_FILE
}


function exit_handler()
{
  rc=$?

  log_runtime_error "INFO" "USER_EXIT_CODE" "$rc"
  local received_signal=$(get_received_signal $rc)
  if [[ -n $received_signal ]]; then
    log_runtime_error "ERROR" "MESSAGE" "Container exited because of the OS signal"
    log_runtime_error "ERROR" "RECEIVE_SIGNAL" "\"$received_signal\""
  fi

  if [[ ${killed_by_yarn} != true && $received_signal == "SIGTERM" || $received_signal == "SIGKILL" ]]; then
    printf "[DEBUG] Exit code $rc, it may be due to out of memory (OOM) or other critical errors.\n"
    log_runtime_error "ERROR" "REASON" "\"Current container is killed, may be due to out of memory (OOM) or other critical errors.\""
  fi

  # After all necessary steps, do a best effort kill here
  pid=$(docker inspect --format={{{ inspectPidFormat }}} $docker_name 2>/dev/null)
  if [[ -n "$pid" && "$pid" -gt 0 ]]; then
    kill -15 $(ps -s $pid -o pid=) &&\
      printf "[DEBUG] Docker container $docker_name killed successfully." ||\
      printf "[DEBUG] Try to kill the container $docker_name but failed. Maybe it has already exited."
  fi
  generate_agg_err

  rc=$(grep "exitcode:" $RUNTIME_AGG_ERROR_FILE | tail -n 1 | grep -o '[0-9]*')
  if [[ $rc -eq "" ]]; then
    rc=0
  fi

  printf "[DEBUG] Write exit code $rc to file /var/lib/hadoopdata/nm-local-dir/nmPrivate/$APP_ID/$CONTAINER_ID/$CONTAINER_ID.pid.exitcode.\n"
  echo $rc > "/var/lib/hadoopdata/nm-local-dir/nmPrivate/$APP_ID/$CONTAINER_ID/$CONTAINER_ID.pid.exitcode"

  exit $rc
} 1>> $RUNTIME_YARN_LOG 2>> $RUNTIME_YARN_ERR_LOG

{
set -x
PS4="+[\t] "
trap exit_handler EXIT


alive_dir="tmp/pai-root/alive"

mkdir -p $alive_dir
while /bin/true; do
  touch "$alive_dir/yarn_$CONTAINER_ID"
  sleep 20
  [ -f "$alive_dir/docker_$CONTAINER_ID" ] && [ ! "$(docker ps | grep $docker_name)" ] && break
done &

gpu_id=''
nvidia_devices=''
{{# taskData.gpuNumber }}
nvidia_devices+='--device=/dev/nvidiactl --device=/dev/nvidia-uvm'
gpu_bitmap=$(perl -e 'printf "%b",'$CONTAINER_GPUS)
for (( i=0,j=$((${#gpu_bitmap}-1)); i<${#gpu_bitmap}; i++,j-- )); do
  if [ ${gpu_bitmap:$i:1} -gt 0 ]; then
    gpu_id+="$j,"
    nvidia_devices+=" --device=/dev/nvidia$j"
  fi
done
gpu_id=$(echo $gpu_id | sed "s/,$//g")
{{/ taskData.gpuNumber }}
printf "%s %s\n%s\n\n" "[INFO]" "GPU_ID" "$gpu_id"
printf "%s %s\n%s\n\n" "[INFO]" "NVIDIA_DEVICES" "$nvidia_devices"

nvidia_flags=''
# check docker runtime
default_runtime=$(docker info --format {{{ infoDefaultRuntimeFormat }}})
if [ "$default_runtime" = '"nvidia"' ]; then
  nvidia_flags+="--runtime=nvidia --env NVIDIA_VISIBLE_DEVICES=$gpu_id"
else
  nvidia_flags+="--runtime=runc --env NVIDIA_VISIBLE_DEVICES=void $nvidia_devices"
fi

infiniband_flags=''
if [[ -d "/dev/infiniband" ]]; then
  infiniband_flags+='--cap-add=IPC_LOCK --device=/dev/infiniband'
fi

{{# jobData.auth }}
set +x
docker login --username {{ jobData.auth.username }} --password {{ jobData.auth.password }} {{ jobData.auth.registryuri }} \
  || { echo 'Authorized failed' ; exit 1; }
set -x
{{/ jobData.auth }}

{{# jobData.authFile }}
IFS=$'\r\n' GLOBIGNORE='*' \
  eval 'cred=($(hdfs dfs -cat {{ jobData.authFile }}))' \
  || { echo 'Read authFile failed' ; exit 1; }
docker login --username ${cred[1]} --password ${cred[2]} ${cred[0]} \
  || { echo 'Authorized failed' ; exit 1; }
{{/ jobData.authFile }}

ssh_config_file="tmp/pai-root/ssh_config"
mkdir -p ${ssh_config_file}
# Get container host list from framework launcher webservice
function get_host_list()
{
  delay_time=20
  info_source="webhdfs"
  while true; do
    if [[ "$info_source" = "webhdfs" ]]; then
      taskrole_statuses=$(curl -s -L -H "Accept: application/json" {{ frameworkInfoWebhdfsUri }} | jq -r ".aggregatedFrameworkStatus.aggregatedTaskRoleStatuses")
    else
      taskrole_statuses=$(curl -sS -X GET -H "Accept: application/json" {{ aggregatedStatusUri }} | jq -r ".aggregatedTaskRoleStatuses")
    fi

    jq -re ".[].taskStatuses.taskStatusArray[].containerIp" <<< $(echo $taskrole_statuses) > /dev/null
    if [ $? -eq 0 ]; then
      null_line=$(jq -r ".[].taskStatuses.taskStatusArray[].containerIp" <<< $(echo $taskrole_statuses) | grep -Ecv "([0-9]{1,3}.?){4}")
      if [ $null_line -eq 0 ]; then
        break
      fi
    fi

    printf "%s %s\n" "[WARNING]" "Failed to request info from $info_source, sleep $delay_time s ..."
    sleep ${delay_time}s
    if [[ $delay_time -ge 300 ]]; then
      if [[ "$info_source" = "webhdfs" ]]; then
        delay_time=60
        info_source="restapi"
      else
        exit 205
      fi
    else
      if [[ "$info_source" = "webhdfs" ]]; then
        delay_time=$((delay_time + 20))
      else
        delay_time=$((delay_time + 60))
      fi
    fi
  done
  for taskrole in $(echo $taskrole_statuses |  jq -r ".|keys[]"); do
    host_list=''
    while read container_ip && read container_ports && read task_index; do
      echo PAI_HOST_IP_${taskrole}_${task_index}=${container_ip} >> $1
      host_list+=${container_ip}:$(echo $container_ports | grep -o "http:[^,;]*" | cut -f 2 -d":"),
      ssh_port=$(echo $container_ports | grep -o "ssh:[^,;]*" | cut -f 2 -d":")
      printf "%s\n  %s\n  %s\n  %s\n  %s\n  %s\n  %s\n" \
        "Host ${taskrole}-${task_index}" \
        "HostName ${container_ip}" \
        "Port ${ssh_port}" \
        "User root" \
        "StrictHostKeyChecking no" \
        "UserKnownHostsFile /dev/null" \
        "IdentityFile /root/.ssh/"{{ jobData.jobName }} >> ${ssh_config_file}/config
      IFS=';' read -r -a ports_arr <<< ${container_ports}
      for port in ${ports_arr[@]}; do
        IFS=':' read -r -a port_info <<< $port
        if [[ ${#port_info[@]} -ge 2 ]]; then
          echo PAI_PORT_LIST_${taskrole}_${task_index}_${port_info[0]}=${port_info[1]} >> $1
          # Backward compatibility
          echo PAI_${taskrole}_${task_index}_${port_info[0]}_PORT=${port_info[1]} >> $1
        fi
      done
    done < <(echo $taskrole_statuses | jq -r ".\"${taskrole}\".taskStatuses.taskStatusArray[] | .containerIp, .containerPorts, .taskIndex")
    echo PAI_TASK_ROLE_TASK_COUNT_${taskrole}=$(echo $taskrole_statuses | jq ".\"${taskrole}\".taskStatuses.taskStatusArray | length") >> $1
    # Backward compatibility
    host_list=$(echo $host_list | sed "s/,$//g")
    echo PAI_TASK_ROLE_${taskrole}_HOST_LIST=$host_list >> $1
  done
}

# Prepare env file
ENV_LIST=env.list

# Yarn container environment variables
echo APP_ID=$APP_ID >> $ENV_LIST
echo FRAMEWORK_NAME=$FRAMEWORK_NAME >> $ENV_LIST
echo HADOOP_USER_NAME=$HADOOP_USER_NAME >> $ENV_LIST
echo PAI_TASK_INDEX=$TASK_INDEX >> $ENV_LIST
echo PAI_CONTAINER_HOST_IP=$CONTAINER_IP >> $ENV_LIST
echo PAI_CONTAINER_HOST_PORT_LIST=$CONTAINER_PORTS >> $ENV_LIST
echo PAI_CONTAINER_ID=$CONTAINER_ID >> $ENV_LIST

# PAI environment variables
PAI_TEMP={{ jobData.jobName }} && echo PAI_JOB_NAME=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ jobData.userName }} && echo PAI_USER_NAME=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ hdfsUri }} && echo PAI_DEFAULT_FS_URI=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRolesNumber }} && echo PAI_TASK_ROLE_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRoleList }} && echo PAI_TASK_ROLE_LIST=$PAI_TEMP >> $ENV_LIST
PAI_TEMP="{{ taskData.gpuNumber }},{{ taskData.cpuNumber }},{{ taskData.memoryMB }},{{ taskData.shmMB }}" \
  && echo PAI_RESOURCE=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.minFailedTaskCount }} && echo PAI_MIN_FAILED_TASK_COUNT_{{ taskData.name }}=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.minSucceededTaskCount }} && echo PAI_MIN_SUCCEEDED_TASK_COUNT_{{ taskData.name }}=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.name }} && echo PAI_CURRENT_TASK_ROLE_NAME=$PAI_TEMP >> $ENV_LIST
echo PAI_CURRENT_TASK_ROLE_CURRENT_TASK_INDEX=$TASK_INDEX >> $ENV_LIST

port_list=(${CONTAINER_PORTS//;/ })
for ports in "${port_list[@]}"; do
  port_label="$(cut -f 1 -d":" <<< $ports)"
  export PAI_CONTAINER_HOST_${port_label}_PORT_LIST="$(cut -f 2 -d":" <<< $ports)"
  echo PAI_CONTAINER_HOST_${port_label}_PORT_LIST="$(cut -f 2 -d":" <<< $ports)" >> $ENV_LIST
done

echo PAI_CONTAINER_HOST_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_http_PORT_LIST)" >> $ENV_LIST
echo PAI_CONTAINER_SSH_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_ssh_PORT_LIST)" >> $ENV_LIST

get_host_list $ENV_LIST

# Backward compatibility
echo \# Backward compatibility >> $ENV_LIST
echo PAI_CURRENT_CONTAINER_IP=$CONTAINER_IP >> $ENV_LIST
echo PAI_TASK_ROLE_INDEX=$TASK_INDEX >> $ENV_LIST
PAI_TEMP={{ jobData.userName }} && echo PAI_USERNAME=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ jobData.dataDir }} && echo PAI_DATA_DIR=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ jobData.outputDir }} && echo PAI_OUTPUT_DIR=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ jobData.codeDir }} && echo PAI_CODE_DIR=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.name }} && echo PAI_TASK_ROLE_NAME=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.taskNumber }} && echo PAI_TASK_ROLE_NUM=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.taskNumber }} && echo PAI_CURRENT_TASK_ROLE_TASK_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.cpuNumber }} && echo PAI_TASK_CPU_NUM=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.cpuNumber }} && echo PAI_CURRENT_TASK_ROLE_CPU_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.memoryMB }} && echo PAI_TASK_MEM_MB=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.memoryMB }} && echo PAI_CURRENT_TASK_ROLE_MEM_MB=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.gpuNumber }} && echo PAI_TASK_GPU_NUM=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.gpuNumber }} && echo PAI_CURRENT_TASK_ROLE_GPU_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.shmMB }} && echo PAI_CURRENT_TASK_ROLE_SHM_MB=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.minFailedTaskCount }} && echo PAI_CURRENT_TASK_ROLE_MIN_FAILED_TASK_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskData.minSucceededTaskCount }} && echo PAI_CURRENT_TASK_ROLE_MIN_SUCCEEDED_TASK_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ tasksNumber }} && echo PAI_TASKS_NUM=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ tasksNumber }} && echo PAI_JOB_TASK_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRolesNumber }} && echo PAI_TASK_ROLES_NUM=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRolesNumber }} && echo PAI_JOB_TASK_ROLE_COUNT=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRoleList }} && echo PAI_TASK_ROLE_LIST=$PAI_TEMP >> $ENV_LIST
PAI_TEMP={{ taskRoleList }} && echo PAI_JOB_TASK_ROLE_LIST=$PAI_TEMP >> $ENV_LIST
echo PAI_CURRENT_CONTAINER_PORT="$(cut -f 1 -d"," <<< $PAI_CONTAINER_HOST_http_PORT_LIST)" >> $ENV_LIST

export $(grep -v '^#' $ENV_LIST | xargs)
# put it after the 'export', so we won't source it into yarn container.
# we want job environment parameters to be available only in job container.
{{# jobEnvs }}
PAI_TEMP={{ key }}={{ value }} && echo $PAI_TEMP >> $ENV_LIST
{{/ jobEnvs }}

# Prepare docker bootstrap script
bootstrap_dir="tmp/pai-root/bootstrap"
mkdir -p $bootstrap_dir
hdfs dfs -get {{ hdfsUri }}/Container/$HADOOP_USER_NAME/$FRAMEWORK_NAME/DockerContainerScripts/{{ idx }}.sh $bootstrap_dir/docker_bootstrap.sh \
  || { echo "Can not get script from HDFS."; exit 1; }

# Prepare user code
code_dir="tmp/pai-root/code"
mkdir -p $code_dir
if [[ -n $PAI_CODE_DIR ]]; then
  hdfs dfs -stat $PAI_CODE_DIR \
    || { echo "Can not stat $PAI_CODE_DIR"; exit 1; }
  code_dir_size="$(hdfs dfs -du -s $PAI_CODE_DIR | cut -d ' ' -f 1)"
  printf "%s %s\n" "[INFO]" "User code_dir_size is ${code_dir_size}"
  # TODO: Make the threshold configurable
  if [ $code_dir_size -gt 0 ] && [ $code_dir_size -le 200000000 ]; then
    hdfs dfs -get $PAI_CODE_DIR $code_dir \
      || { echo "Can not get code from HDFS."; exit 1; }
  elif [ $code_dir_size -eq 0 ]; then
    # pass when code_dir_size is 0
    :
  elif [ $code_dir_size -lt 0 ]; then
    echo "$PAI_CODE_DIR is less than 0MB, exit."
    exit 1
  else
    echo "$PAI_CODE_DIR is larger than 200MB, exit."
    exit 1
  fi
fi

# Prepare docker debug log
log_dir="tmp/pai-root/log"
mkdir -p $log_dir
ln -s /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/runtime.docker.pai.error $RUNTIME_DOCKER_ERR_LOG
ln -s /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/runtime.docker.pai.log $RUNTIME_DOCKER_LOG
ln -s /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/user.pai.stderr $USER_STDERR
ln -s /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/user.pai.stdout $USER_STDOUT
ln -s /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/user.pai.all $USER_LOG_ALL
ln -s $DISK_CLEANER_ERROR_FILE /$LOCAL_DIRS/$CONTAINER_ID/$log_dir/diskCleaner.pai.error

# copy the hadoop configuration file
hadoop_config_dir="hadoop_config"
mkdir -p $hadoop_config_dir
cp -r /hadoop-configuration-for-jobs/* $hadoop_config_dir

# retrieve the yarn local dir
hadoop_tmp_dir="/var/lib/hadoopdata"
container_id=$(cat /proc/self/cgroup | grep "memory" | awk -F '/' '{print $NF}')
mounted_path=$(docker inspect $container_id |\
  jq -r --arg hadoop_tmp_dir "$hadoop_tmp_dir" '.[] | .Mounts | .[] | select(.Destination==$hadoop_tmp_dir) | .Source')
container_local_dir=$mounted_path/nm-local-dir/usercache/{{ jobData.userName }}/appcache/$APP_ID/$CONTAINER_ID

# pull docker image and run
docker pull {{ jobData.image }} \
  || { echo "Can not pull Docker image"; exit 1; }

## ATTENTION: do not specify `--pid=host` when run job, otherwise job-exporter can not get right
## network consumption
docker run --name $docker_name \
  --init \
  --tty \
  --rm \
  --privileged=false \
  --oom-score-adj=1000 \
  --cap-add=SYS_ADMIN \
  --cap-add=DAC_READ_SEARCH \
  --cap-drop=MKNOD \
  --network=host \
  --cpus={{ taskData.cpuNumber }} \
  --memory={{ taskData.memoryMB }}m \
  --shm-size={{ taskData.shmMB }}m \
  $nvidia_flags \
  $infiniband_flags \
  --device=/dev/fuse \
{{# isDebug }}
  --cap-add=SYS_PTRACE \
  --security-opt seccomp:unconfined \
{{/ isDebug }}
  --security-opt apparmor:unconfined \
{{# reqAzRDMA }}
{{# azRDMA }}
  --volume /var/lib/hyperv:/var/lib/hyperv \
  --volume /etc/dat.conf:/etc/dat.conf \
  --ulimit memlock=-1  \
{{/ azRDMA }}
{{/ reqAzRDMA }}
  --volume $container_local_dir/$alive_dir:/alive \
  --volume $container_local_dir/$alive_dir/yarn_$CONTAINER_ID:/alive/yarn_$CONTAINER_ID:ro \
  --volume $container_local_dir/$log_dir:/pai/log \
  --volume $container_local_dir/$bootstrap_dir:/pai/bootstrap:ro \
  --volume $container_local_dir/$code_dir:/pai/code:ro \
  --volume /var/drivers/nvidia/current:/usr/local/nvidia:ro \
  --volume $container_local_dir/$hadoop_config_dir:/etc/hadoop \
  --volume $container_local_dir/$ssh_config_file:/pai/ssh_config/:ro \
  --label GPU_ID=$gpu_id \
  --label PAI_HOSTNAME="$(hostname)" \
  --label PAI_JOB_NAME={{ jobData.jobName }} \
  --label PAI_JOB_VC_NAME={{ jobData.virtualCluster }} \
  --label PAI_USER_NAME={{ jobData.userName }} \
  --label PAI_CURRENT_TASK_ROLE_NAME={{ taskData.name }} \
  --env-file $ENV_LIST \
  --entrypoint="" \
   {{ jobData.image }} \
  /bin/bash '/pai/bootstrap/docker_bootstrap.sh'
} 1>> $RUNTIME_YARN_LOG 2>> $RUNTIME_YARN_ERR_LOG
